// Copyright (C) Kumo inc. and its affiliates.
// Author: Jeff.li lijippy@163.com
// All rights reserved.
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published
// by the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program.  If not, see <https://www.gnu.org/licenses/>.
//

#pragma once



#include <atomic>
#include <cassert>
#include <cstdlib>
#include <memory>
#include <stdexcept>
#include <type_traits>
#include <utility>

namespace nebula::util {

        namespace {
#if defined(__arm__)
#define QUEUE_ARM 1
#else
#define QUEUE_ARM 0
#endif

#if defined(__s390x__)
#define QUEUE_S390X 1
#else
#define QUEUE_S390X 0
#endif

            constexpr bool kIsArchArm = QUEUE_ARM == 1;
            constexpr bool kIsArchS390X = QUEUE_S390X == 1;
        }  // namespace

        namespace {

            constexpr std::size_t hardware_destructive_interference_size =
                    (kIsArchArm || kIsArchS390X) ? 64 : 128;

        }  // namespace

        /*
         * SpscQueue is a one producer and one consumer queue
         * without locks.
         */
        template <class T>
        struct SpscQueue {
            typedef T value_type;

            SpscQueue(const SpscQueue&) = delete;
            SpscQueue& operator=(const SpscQueue&) = delete;

            // size must be >= 2.
            //
            // Also, note that the number of usable slots in the queue at any
            // given time is actually (size-1), so if you start with an empty queue,
            // IsFull() will return true after size-1 insertions.
            explicit SpscQueue(uint32_t size)
                    : size_(size),
                      records_(static_cast<T*>(std::malloc(sizeof(T) * size))),
                      readIndex_(0),
                      writeIndex_(0) {
                assert(size >= 2);
                if (!records_) {
                    throw std::bad_alloc();
                }
            }

            ~SpscQueue() {
                // We need to destruct anything that may still exist in our queue.
                // (No real synchronization needed at destructor time: only one
                // thread can be doing this.)
                if (!std::is_trivially_destructible<T>::value) {
                    size_t readIndex = readIndex_;
                    size_t endIndex = writeIndex_;
                    while (readIndex != endIndex) {
                        records_[readIndex].~T();
                        if (++readIndex == size_) {
                            readIndex = 0;
                        }
                    }
                }

                std::free(records_);
            }

            template <class... Args>
            bool write(Args&&... recordArgs) {
                auto const currentWrite = writeIndex_.load(std::memory_order_relaxed);
                auto nextRecord = currentWrite + 1;
                if (nextRecord == size_) {
                    nextRecord = 0;
                }
                if (nextRecord != readIndex_.load(std::memory_order_acquire)) {
                    new (&records_[currentWrite]) T(std::forward<Args>(recordArgs)...);
                    writeIndex_.store(nextRecord, std::memory_order_release);
                    return true;
                }

                // queue is full
                return false;
            }

            // move the value at the front of the queue to given variable
            bool read(T& record) {
                auto const currentRead = readIndex_.load(std::memory_order_relaxed);
                if (currentRead == writeIndex_.load(std::memory_order_acquire)) {
                    // queue is empty
                    return false;
                }

                auto nextRecord = currentRead + 1;
                if (nextRecord == size_) {
                    nextRecord = 0;
                }
                record = std::move(records_[currentRead]);
                records_[currentRead].~T();
                readIndex_.store(nextRecord, std::memory_order_release);
                return true;
            }

            // pointer to the value at the front of the queue (for use in-place) or
            // nullptr if empty.
            T* FrontPtr() {
                auto const currentRead = readIndex_.load(std::memory_order_relaxed);
                if (currentRead == writeIndex_.load(std::memory_order_acquire)) {
                    // queue is empty
                    return nullptr;
                }
                return &records_[currentRead];
            }

            // queue must not be empty
            void PopFront() {
                auto const currentRead = readIndex_.load(std::memory_order_relaxed);
                assert(currentRead != writeIndex_.load(std::memory_order_acquire));

                auto nextRecord = currentRead + 1;
                if (nextRecord == size_) {
                    nextRecord = 0;
                }
                records_[currentRead].~T();
                readIndex_.store(nextRecord, std::memory_order_release);
            }

            bool IsEmpty() const {
                return readIndex_.load(std::memory_order_acquire) ==
                       writeIndex_.load(std::memory_order_acquire);
            }

            bool IsFull() const {
                auto nextRecord = writeIndex_.load(std::memory_order_acquire) + 1;
                if (nextRecord == size_) {
                    nextRecord = 0;
                }
                if (nextRecord != readIndex_.load(std::memory_order_acquire)) {
                    return false;
                }
                // queue is full
                return true;
            }

            // * If called by consumer, then true size may be more (because producer may
            //   be adding items concurrently).
            // * If called by producer, then true size may be less (because consumer may
            //   be removing items concurrently).
            // * It is undefined to call this from any other thread.
            size_t SizeGuess() const {
                int ret = writeIndex_.load(std::memory_order_acquire) -
                          readIndex_.load(std::memory_order_acquire);
                if (ret < 0) {
                    ret += size_;
                }
                return ret;
            }

            // maximum number of items in the queue.
            size_t capacity() const { return size_ - 1; }

        private:
            using AtomicIndex = std::atomic<unsigned int>;

            char pad0_[hardware_destructive_interference_size];
            const uint32_t size_;
            T* const records_;

            AtomicIndex readIndex_;
            char pad1_[hardware_destructive_interference_size - sizeof(AtomicIndex)];
            AtomicIndex writeIndex_;

            char pad2_[hardware_destructive_interference_size - sizeof(AtomicIndex)];
        };

}  // namespace nebula::util

